#include "config.h"

#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"
#include "volume_proto.h"

#include "lsv_wbuffer_internal.h"
#include "lsv_bitmap_snap.h"


int lsv_wbuffer_segment_init(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        count_list_init(&wbuf->work_list);
        count_list_init(&wbuf->ready_list);

        wbuf->last_segment = NULL;
        wbuf->last_chunk_idx = -1;
        wbuf->last_page_idx = -1;

        // TODO malloc all

        return 0;
}

int lsv_wbuffer_segment_destroy(lsv_volume_proto_t *lsv_info) {
        int ret = 0;
        lsv_s32_t i = 0;
        lsv_s32_t j = 0;
        lsv_s32_t idx = 0;
        lsv_wbuf_t *wbuf = NULL;
        lsv_wbuf_segment_t * segment = NULL;
        struct list_head *pos, *next;

        wbuf = (lsv_wbuf_t *) lsv_info->wbuf;
        list_for_each_safe(pos, next, &wbuf->work_list.list){
                segment = list_entry(pos, lsv_wbuf_segment_t, work_hook);
                yfree((void **)&segment->lsv_chunk);
                for(i = 0; i < LSV_WBUF_CHUNK_NUM; i ++){
                        for(j = 0; j < LSV_WBUF_PAGE_NUM; j++){
                                idx = i * LSV_WBUF_PAGE_NUM + j;
                                list_del_init(&segment->hash_block_array[idx].hook);
                        }
                }

                count_list_del_init(&segment->work_hook, &wbuf->work_list);
                yfree((void **)&segment);
        }

        list_for_each_safe(pos, next, &wbuf->ready_list.list){
                segment = list_entry(pos, lsv_wbuf_segment_t, ready_hook);
                yfree((void **)&segment->lsv_chunk);

                count_list_del_init(&segment->ready_hook, &wbuf->ready_list);
                yfree((void **)&segment);
        }

        return ret;
}

int lsv_wbuffer_segment_malloc(lsv_volume_proto_t *lsv_info){
        int ret = 0;
        lsv_wbuf_t *wbuf = NULL;
        lsv_wbuf_segment_t * segment = NULL;

        wbuf = (lsv_wbuf_t *) lsv_info->wbuf;

        ret = ymalloc((void **)&segment, sizeof(lsv_wbuf_segment_t));
        if(unlikely(ret)){
                GOTO(EXIT1, ret);
        }
        memset(segment, 0, sizeof(lsv_wbuf_segment_t));

        // chunk list
        ret = ymalloc((void **)&segment->lsv_chunk, sizeof(lsv_chunk_buf_t) * LSV_WBUF_CHUNK_NUM);
        if(unlikely(ret)){
                GOTO(EXIT2, ret);
        }
        memset(segment->lsv_chunk, 0, sizeof(lsv_chunk_buf_t) * LSV_WBUF_CHUNK_NUM);

        for(int i = 0; i < LSV_WBUF_CHUNK_NUM; i++){
                // segment->lsv_chunk[i].idx = (lsv_s32_t)((lsv_u64_t)&segment->lsv_chunk[i] & 0x7fffffff);
                segment->lsv_chunk[i].idx = i;
                DINFO("malloc chunk_id %d %u\n", i, segment->lsv_chunk[i].idx);
        }

        // hash
        memset(&segment->hash_block_array, 0, sizeof(index_block) * LSV_WBUF_CHUNK_NUM * LSV_WBUF_PAGE_NUM);

        INIT_LIST_HEAD(&segment->work_hook);
        INIT_LIST_HEAD(&segment->ready_hook);
        count_list_add_tail(&segment->work_hook, &wbuf->work_list);

        DINFO("segment work %u ready %u\n", wbuf->work_list.count, wbuf->ready_list.count);

        return 0;
EXIT2:
        yfree((void **)&segment);
EXIT1:
        return ret;
}

/**
 * @brief 写入点在work list尾部.
 *
 * @param lsv_info
 * @return
 */
lsv_wbuf_segment_t *get_current_segment(lsv_volume_proto_t * lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        if (list_empty_careful(&wbuf->work_list.list)) {
                return NULL;
        } else {
                return list_entry(wbuf->work_list.list.prev, lsv_wbuf_segment_t, work_hook);
        }
}

/**
 *
 * @pre 写入点可回收.
 * @param lsv_info
 * @return
 */
int get_next_segment(lsv_volume_proto_t * lsv_info) {
        int ret = 0;
        lsv_wbuf_t *wbuf = (lsv_wbuf_t *)lsv_info->wbuf;

        ret = lsv_wbuffer_segment_gc(lsv_info);
        if (ret > 0) {
                // 找到了
                goto EXIT;
        }

        int cnt = wbuf->work_list.count + wbuf->ready_list.count;
        if (cnt < LSV_WBUF_CHUNK_GROUP_MAX) {
                // TODO yield, double check
                ret = lsv_wbuffer_segment_malloc(lsv_info);
                if(unlikely(ret)) {
                        GOTO(EXIT, ret);
                }
        } else {
                DINFO("wbuf segment full, waiting......\n");
                co_cond_wait(&wbuf->full_cond);
        }

EXIT:
        return ret;
}

int lsv_wbuffer_segment_check(lsv_volume_proto_t *lsv_info, lsv_io_opt_t *io_opt, lsv_wbuf_segment_t **segment) {
        return 0;
}

/**
 * @brief 确保segment有可用页
 *
 * @param lsv_info
 * @param segment
 * @return
 */
int lsv_wbuffer_segment_check_with_malloc(lsv_volume_proto_t *lsv_info,
                                          lsv_io_opt_t *io_opt,
                                          lsv_wbuf_segment_t **segment) {
        lsv_wbuf_segment_t *_segment;
        lsv_chunk_buf_t *chunk;

        *segment = NULL;

        _segment = get_current_segment(lsv_info);
        while (_segment == NULL || segment_is_full(_segment)) {
                get_next_segment(lsv_info);
                _segment = get_current_segment(lsv_info);
        }

        YASSERT(_segment != NULL);

        DINFO("segment %p selected, chunk %d\n", _segment, _segment->chunk_idx);

        YASSERT(_segment->chunk_idx < LSV_WBUF_CHUNK_NUM);

        chunk = &_segment->lsv_chunk[_segment->chunk_idx];
        if (chunk_is_full(chunk)) {
                _segment->chunk_idx++;
                YASSERT(_segment->chunk_idx < LSV_WBUF_CHUNK_NUM);
                YASSERT(_segment->lsv_chunk[_segment->chunk_idx].page_idx == 0);
        } else {
                YASSERT(chunk->page_idx < LSV_WBUF_PAGE_NUM - 1);
        }

        *segment = _segment;
        return 0;
}

static inline int _segment_is_in_use(const lsv_wbuf_segment_t *segment) {
        int count = 0;
        lsv_chunk_buf_t *chunk;

        for(int i = 0; i < LSV_WBUF_CHUNK_NUM; i++){
                chunk = &segment->lsv_chunk[i];
                if(LSV_WBUF_CHUNK_FULL == chunk->in_use){
                        count ++;
                }
                DINFO("check chunk %p %p page_idx %d in_use %d %d\n",
                      segment,
                      chunk,
                      chunk->page_idx,
                      chunk->in_use,
                      count);
        }

        if (count > 0) {
                return TRUE;
        }
        return FALSE;
}

static inline int _segment_test_and_reset(lsv_wbuf_segment_t *segment) {
        if (_segment_is_in_use(segment))
                return FALSE;

        segment->chunk_idx = 0;
        return TRUE;
}

static int __gc_work_list(lsv_wbuf_t *wbuf, int skip_last) {
        int ready_count = 0, count = 0;
        lsv_wbuf_segment_t *segment = NULL;
        struct list_head *pos, *next;

        list_for_each_safe(pos, next, &wbuf->work_list.list){
                count ++;
                if (skip_last && count >= wbuf->work_list.count) {
                        break;
                }

                segment = list_entry(pos, lsv_wbuf_segment_t, work_hook);
                DINFO("work segment %p chunk_idx %u\n", segment, segment->chunk_idx);
                // YASSERT(segment->chunk_idx == LSV_WBUF_CHUNK_NUM - 1);
                YASSERT(segment_is_full(segment));

                if (!_segment_test_and_reset(segment))
                        break;

                ready_count++;
                DINFO("segment %p ready %d\n", segment, ready_count);
                count_list_del_init(&segment->work_hook, &wbuf->work_list);
                count_list_add_tail(&segment->ready_hook, &wbuf->ready_list);
        }

        DERROR("segment work %u ready %u\n", wbuf->work_list.count, wbuf->ready_list.count);
        return 0;
}

/**
 * 回收work list，包括写入点，即最后一个
 * 从ready list选择一个，作为当前写入点
 *
 * @pre 写入点可回收.
 * @param lsv_info
 * @return > 0 ok
 * @return == 0 fail
 */
int lsv_wbuffer_segment_gc(lsv_volume_proto_t *lsv_info){
        lsv_wbuf_t *wbuf = NULL;
        lsv_wbuf_segment_t *segment = NULL;
        struct list_head *pos, *next;

        wbuf = lsv_info->wbuf;

        __gc_work_list(wbuf, 0);

        int regc_cnt = 0;
        list_for_each_safe(pos, next, &wbuf->ready_list.list){
                segment = list_entry(pos, lsv_wbuf_segment_t, ready_hook);

                count_list_del_init(&segment->ready_hook, &wbuf->ready_list);
                count_list_add_tail(&segment->work_hook, &wbuf->work_list);

                // TODO 真正用到的时候，才回收索引
                lsv_wb_hash_index_gc(segment);
                regc_cnt ++;
                break;
        }

        return regc_cnt;
}

#if 0
int lsv_wbuffer_segment_get_free_size(lsv_wbuf_segment_t *segment) {
        if (segment_is_full(segment)) {
                return 0;
        }

        lsv_chunk_buf_t *chunk;

        chunk = &segment->lsv_chunk[segment->chunk_idx];
        return (LSV_WBUF_CHUNK_NUM - segment->chunk_idx - 1) * (LSV_WBUF_CHUNK_SIZE - LSV_WBUF_PAGE_SIZE) +
                (LSV_WBUF_PAGE_NUM - chunk->page_idx - 1) * LSV_WBUF_PAGE_SIZE;
}
#endif

int lsv_wbuffer_segment_commit(lsv_volume_proto_t *lsv_info, lsv_chunk_buf_t *chunk, int in_use) {

        // TODO error handling
        YASSERT(in_use == 0);

        // TODO notify
        chunk->in_use = (lsv_s8_t)in_use;
        // TODO MUST NOT
        // chunk->chunk_lsn = 0;

        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        __gc_work_list(wbuf, 1);

        return 0;
}

static void _check_page_rule(lsv_volume_proto_t *lsv_info, lsv_wbuf_segment_t *segment, uint32_t page_idx) {
        // TODO 多卷情况下，无法工作
        // static变量面临可重入性和线程安全的问题
        // static lsv_wbuf_segment_t *last_segment = NULL;
        // static int last_chunk_idx = -1;
        // static int last_page_idx = -1;

        // chunk组构成环形队列，组内chunk从0到N-1排序，
        // chunk内page从0-254,255即置水位线为满
        // 故分三级：
        // - 切换chunk组
        // - chunk组内切换chunk++
        // - 同chunk内切换页++

        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        // [0, 254]
        YASSERT(page_idx < LSV_WBUF_PAGE_NUM - 1);

        DINFO("vol %llu last %p %d %d new %p %d %d\n",
              (LLU)lsv_info->ino,
              wbuf->last_segment, wbuf->last_chunk_idx, wbuf->last_page_idx,
              segment, segment->chunk_idx, page_idx);

#if 1
        if (wbuf->last_chunk_idx != -1) {
                if (segment->chunk_idx != wbuf->last_chunk_idx) {
                        // TODO core
                        YASSERT(page_idx == 0);
                        // YASSERT(chunk_idx == last_chunk_idx + 1);
                        // YASSERT(last_page_idx == LSV_WBUF_PAGE_NUM - 1 - 1);
                } else {
                        // TODO flush的情况下，可能选择了同一segment的同一chunk
                        // 如chunk都为0，如此则以下不变式不成立
                        // YASSERT(page_idx == wbuf->last_page_idx + 1);
                }
        }

        wbuf->last_segment = segment;
        wbuf->last_chunk_idx = segment->chunk_idx;
        wbuf->last_page_idx = page_idx;
#endif
}

/**
 *
 * 允许拆分IO吗？
 *
 * @deprecated
 * @param lsv_info
 * @return
 */
int lsv_wbuffer_segment_push_log(lsv_volume_proto_t *lsv_info, lsv_chunk_buf_t *chunk, int flush) {
        int ret = 0;
        lsv_wbuf_segment_t *segment;

        lsv_info->log_write_count++;

        // TODO 两者不同
        // 如果切换segment，且上一segment已经被回收，则发生数据不一致
        // lsv_wbuf_segment_ensure(lsv_info, &segment);
        segment = get_current_segment(lsv_info);
        if (flush && segment == NULL) {
                return 0;
        }

        YASSERT(segment != NULL);

        if (flush) {
                // 当前segment有没有flush的数据
                // 如果是segment的最后一个chunk，且chunk已满，flush/or NOT两情况下处理不同
                if (segment_is_full(segment)) {
                        return 0;
                }

                YASSERT(chunk == NULL);
                YASSERT(segment->chunk_idx < LSV_WBUF_CHUNK_NUM);
                chunk = &segment->lsv_chunk[segment->chunk_idx];
        } else {
                YASSERT(chunk != NULL);
                YASSERT(chunk->page_idx == LSV_WBUF_PAGE_NUM - 1);
        }

        DERROR("vol %llu segment %p chunk %p %d+ page %d flush %d\n",
               (LLU)lsv_info->ino,
               segment,
               chunk,
               segment->chunk_idx,
               chunk->page_idx,
               flush);

        // 在flush的时候，可能是不满的chunk，不到1M。
        if (chunk->page_idx > 0) {
                // TODO if tail is full

                chunk->hlog_arg[0].head.timestamp = gettime();
                chunk->hlog_arg[0].head.page_count = chunk->page_idx;
                chunk->hlog_arg[0].head.crc = 0;

                DERROR("vol %llu chunk_id %u page_idx %u snap_id %u\n",
                       (LLU)lsv_info->ino,
                       chunk->page_idx,
                       lsv_bitmap_snap_get_max_id(lsv_info));

                chunk->in_use = LSV_WBUF_CHUNK_FULL;
                lsv_wbuffer2log(lsv_info, chunk);

                if (flush) {
                        // TODO flush, switch to next chunk
                        // 没有遵循segment整体回收的原则，会重用chunk，造成问题
                        // 所以，切换当前chunk指针到下一个chunk（水位线)
                        segment->chunk_idx++;
                }
        }

        return ret;
}

int lsv_wbuffer_segment_append_page(lsv_volume_proto_t *lsv_info, lsv_io_opt_t *io_opt, buffer_t *page_buf) {
        int ret = 0;
        lsv_wbuf_segment_t *segment = NULL;
        lsv_chunk_buf_t *chunk_buf = NULL;
        lsv_log_meta_t *hlog_arg = NULL;
        lsv_u32_t tail_chunk_idx = 0;
        lsv_s32_t page_idx = 0;

        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        if (wbuf->__last_psn != 0) {
                YASSERT(io_opt->lsn >= wbuf->__last_psn);
        }
        wbuf->__last_psn = io_opt->lsn;

        // TODO 有yield，需要仔细分析重入时的情况
        lsv_wbuffer_segment_check_with_malloc(lsv_info, io_opt, &segment);

        // 后续过程无yield，无须同步机制
        tail_chunk_idx = segment->chunk_idx;
        chunk_buf = &segment->lsv_chunk[tail_chunk_idx];
        page_idx = chunk_buf->page_idx;

        // TODO wbuf填充序，与WAL LSN不一致
        chunk_buf->chunk_psn = (++wbuf->last_psn);
        // chunk_buf->in_use = LSV_WBUF_CHUNK_FULL;

        _check_page_rule(lsv_info, segment, page_idx);

        mbuffer_popmsg(page_buf, (lsv_s8_t *) (&chunk_buf->page_buf) + page_idx * LSV_WBUF_PAGE_SIZE, io_opt->size);

        uint32_t snap_id = lsv_bitmap_snap_get_max_id(lsv_info);

        hlog_arg = chunk_buf->hlog_arg;
        hlog_arg[page_idx + 1].hlog.lba = io_opt->lba;
        hlog_arg[page_idx + 1].hlog.size = io_opt->size;
        hlog_arg[page_idx + 1].hlog.snap_id = snap_id;

        chunk_buf->dirty =(lsv_s8_t) 1;

        DERROR("vol %llu psn %llu %llu lsn %llu chk_lsn %llu lba %llu size %d seg %p %d pg_idx %d snap_id %u\n",
               (LLU)lsv_info->ino,
               (LLU)wbuf->last_psn,
               (LLU)wbuf->commit_psn,
               (LLU)io_opt->lsn,
               (LLU)chunk_buf->chunk_lsn,
               (LLU)io_opt->lba,
               io_opt->size,
               segment,
               tail_chunk_idx,
               page_idx,
               snap_id);

        // TODO LSN乱序，不满足单调递增
        // LSN是WAL序，内存部分并非一定要遵循
        if (chunk_buf->chunk_lsn < io_opt->lsn) {
                chunk_buf->chunk_lsn = io_opt->lsn;
        }

        // update index
        lsv_wb_hash_index_insert(lsv_info, segment, io_opt->lba, tail_chunk_idx, page_idx);

        // next page
        YASSERT(chunk_buf->page_idx == page_idx);
        chunk_buf->page_idx++;

        if (chunk_is_full(chunk_buf)) {
                // TODO if tail is full, next chunk
                DINFO("segment %p chunk %p %u page_idx %u\n", segment, chunk_buf, tail_chunk_idx, chunk_buf->page_idx)

#if 0
                int fp=open("/home/test.d", O_WRONLY|O_CREAT);
		for(int i = 0; i < chunk_buf->page_idx;i ++){
			lseek(fp, chunk_buf->hlog_arg[i + 1].hlog.lba, SEEK_SET);
			write(fp, (lsv_s8_t *)(chunk_buf->page_buf + i * LSV_WBUF_PAGE_SIZE), LSV_WBUF_PAGE_SIZE);
		}
		close(fp);
#endif
                lsv_wbuffer_segment_push_log(lsv_info, chunk_buf, 0);
        }

        return ret;
}
